package com.hopu.shop.time;

import com.alibaba.fastjson.JSONObject;
import com.hopu.bean.time.TimeSexCount;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import java.util.Properties;

public class TimeSexCountAna {

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .config("spark.driver.allowMultipleContexts", "true")
                .config("spark.sql.crossJoin.enabled", "true")
                .appName("timeSexCountAna")
                .master("local").getOrCreate();
        JavaSparkContext context = new JavaSparkContext(spark.sparkContext());
        JavaRDD<String> rdd = context.textFile("D://word/user_session.log");
        JavaRDD<TimeSexCount> map = rdd.map(t -> {
            JSONObject json = (JSONObject) JSONObject.parse(t);
            String hour = json.getJSONObject("odate").getString("hour");
            String sex = json.getString("sex");
            return new TimeSexCount(hour, sex);
        });

        //过滤出每个时间段的RDD
        for (int i = 0; i < 24; i++) {
            transAndPut(map, i+"", spark);
        }
    }

    public static void transAndPut(JavaRDD<TimeSexCount> map, String hour, SparkSession spark) {
        JavaRDD<TimeSexCount> hourRDD = map.filter(t -> hour.equals(t.getHour()));

        Dataset<Row> df = spark.createDataFrame(hourRDD, TimeSexCount.class);

        df = df.groupBy("hour", "sex").count();

        df.show();

        //写入到Mysql
        Properties pro = new Properties();
        pro.setProperty("driver", "com.mysql.jdbc.Driver");
        pro.setProperty("user", "root");
        pro.setProperty("password", "123456");

        df.write().mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.136.200:3306/data_ana?useUnicode=true&characterEncoding=UTF-8", "time_sex_count", pro);

//        femaleDF.write().mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.136.200:3306/data_ana?useUnicode=true&characterEncoding=UTF-8", "time_sex_count", pro);

    }
}
